from pyspark.sql import SparkSessionimport pandas as pdimport plotly.express as pximport plotly.io as piopio.renderers.default ="svg"import reimport numpy as npimport plotly.graph_objects as gofrom pyspark.sql.functions import col, split, explode, regexp_replace, transform, whenfrom pyspark.sql import functions as Ffrom pyspark.sql.functions import col, monotonically_increasing_idnp.random.seed(42)pio.renderers.default ="notebook"# Initialize Spark Sessionspark = SparkSession.builder.appName("./data/LightcastData").getOrCreate()# Load Datadf = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")# Show Schema and Sample Data#print("---This is Diagnostic check, No need to print it in the final doc---")#df.printSchema() # comment this line when rendering the submission#df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/24 21:07:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/24 21:07:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:===========================================================(1 + 0) / 1] [Stage 1:> (0 + 1) / 1]
import pandas as pdimport reimport plotly.express as pxpdf = df.filter(df["SALARY"] >0).select("NAICS2_NAME", "SALARY").toPandas()pdf["NAICS2_NAME"] = pdf["NAICS2_NAME"].apply(lambda x: re.sub(r"[^\x00-\x7F]+", "", str(x)).strip() if pd.notnull(x) else"")pdf = pdf[pdf["NAICS2_NAME"] !=""]median_salaries = pdf.groupby("NAICS2_NAME")["SALARY"].median()sorted_industries = median_salaries.sort_values(ascending=False).indexpdf["NAICS2_NAME"] = pd.Categorical( pdf["NAICS2_NAME"], categories=sorted_industries, ordered=True)fig = px.box( pdf, x="NAICS2_NAME", y="SALARY", title="Salary Distribution by Industry", color_discrete_sequence=["#EF553B"], # bright red for boxes boxmode="group", points="all",)fig.update_layout( title=dict( text="Salary Distribution by Industry", font=dict(size=30, family="Arial", color="#990000", weight="bold") # dark red title ), xaxis=dict( title=dict(text="Industry", font=dict(size=14, family="Arial", color="#B22222", weight="bold")), # firebrick red tickangle=45, tickfont=dict(size=12, family="Arial", color="#B22222", weight="bold"), showline=True, linewidth=2, linecolor="#B22222", mirror=True, showgrid=False, categoryorder="array", categoryarray=sorted_industries.tolist() ), yaxis=dict( title=dict(text="Salary (K $)", font=dict(size=14, family="Arial", color="#800000", weight="bold")), # maroon tickvals=[100000, 200000, 300000, 400000, 500000], ticktext=["100K", "200K", "300K", "400K", "500K"], tickfont=dict(size=12, family="Arial", color="#800000", weight="bold"), showline=True, linewidth=2, linecolor="#800000", mirror=True, showgrid=False, gridcolor="#F5B7B1", gridwidth=0.5, ), font=dict(family="Arial", size=12, color="#800000"), boxgap=0.7, plot_bgcolor="#FFF0F0", paper_bgcolor="#FFF5F5", showlegend=False, height=900, width=1100,)fig.show()fig.write_html("output/Q2.html")fig.write_image("output/Q2.svg", width=1100, height=900, scale=1)
[Stage 7:> (0 + 1) / 1]
5 Salary Analysis by ONET Occupation Type (Bubble Chart)
df.createOrReplaceTempView("Job_Postings")
25/09/24 21:08:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
salary_analysis = spark.sql(""" Select TITLE_NAME AS ONET_NAME, PERCENTILE(SALARY, 0.5) AS Median_Salary, COUNT(*) AS Job_Postings FROM job_Postings GROUP BY TITLE_NAME ORDER BY Job_Postings DESC LIMIT 10""")salary_pd = salary_analysis.toPandas()salary_pd.head()import plotly.express as pxfig = px.scatter( salary_pd, x ="ONET_NAME", y="Median_Salary", size="Job_Postings", title="Median Salary by ONET Occupation Type (Bubble Chart)", labels = {"ONET_NAME": "ONET Occupation", "Median_Salary": "Median Salary", "Job_Postings": "NUmber of Job Postings" }, hover_name ="ONET_NAME", size_max =60, width=1000, height=600, color="Job_Postings", color_continuous_scale="Plasma")fig.update_layout( font_family="Arial", font_size =14, title_font_size=25, xaxis_title ="ONET Occupation", yaxis_title="Median Salary", plot_bgcolor="white", xaxis=dict( tickangle=-45, showline=True, linecolor="black" ), yaxis=dict( showline=True, linecolor="black" ))fig.show()fig.write_html("output/Q3.html")fig.write_image("output/Q3.svg", width=1100, height=900, scale=1)
[Stage 8:> (0 + 1) / 1]
6 Salary by Education Level
lower_deg = ["Bachelor's", "Associate's", "GED", "No Education Listed", "High School"]higher_deg = ["Master's Degree", "PHD or professional degree"]df = df.withColumn("EDU_GROUP", when(col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}"for deg in lower_deg])), "Bachelor's or lower") .when(col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}"for deg in higher_deg])), "Master's or PHD") .otherwise("Other"))df= df.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))df= df.withColumn("AVERAGE_SALARY", col("AVERAGE_SALARY").cast("float"))df = df.filter( col("MAX_YEARS_EXPERIENCE").isNotNull() & col("AVERAGE_SALARY").isNotNull() & (col("MAX_YEARS_EXPERIENCE") >0) & (col("AVERAGE_SALARY") >0) )df_filtered = df.filter(col("EDU_GROUP").isin("Bachelor's or lower", "Master's or PHD"))df_pd = df_filtered.toPandas()import numpy as npjitter_amount =0.15df_pd['MAX_YEARS_EXPERIENCE_JITTER'] = ( df_pd['MAX_YEARS_EXPERIENCE'] + np.random.uniform(-jitter_amount, jitter_amount, len(df_pd)))